Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add fanout to tpu-client-next #3478

Merged
merged 10 commits into from
Nov 7, 2024

Conversation

KirillLykov
Copy link

Problem

Crate tpu-client-next doesn't have fan out so far.

Summary of Changes

Add this feature.

@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch from 107384f to f62dee2 Compare November 5, 2024 10:47
tpu-client-next/tests/connection_workers_scheduler_test.rs Outdated Show resolved Hide resolved
tpu-client-next/src/connection_workers_scheduler.rs Outdated Show resolved Hide resolved
// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
tokio::select! {
send_res = workers.send_transactions_to_address(new_leader, transaction_batch.clone()) => match send_res {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now this will block if the corresponding worker's channel is full, so
effectively the slowest leader will slow down all other leaders.

I think we should 1) increase the channel size and 2) add a
try_send_transactions_to_address that returns an error if the channel is full.
If the channel is full, I guess as a start we could log a warning and drop the
batch for that leader, but at least we don't slow down the other leaders.

Then longer term, we need to think of a better API so that the caller of the
crate can decide what should happen: drop the batch? increase the channel size?
slow down upstream?

Copy link
Author

@KirillLykov KirillLykov Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, I'm not sure that there is a value in having both send_transactions and try_send_transactions in the context of this crate. So probably just modify the code to use try_. This also allows to reduce select!

Copy link
Author

@KirillLykov KirillLykov Nov 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The size of the channel worker_channel_size is configurable through the Scheduler config. The only question what value should be used in practice for the STS, for example.

Copy link
Author

@KirillLykov KirillLykov Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing that worries me is the scenario with transaction-bench. There we actually need to have a backpressure (which has been created with these worker_channel.send().await) because it tries to generate as many as possible transaction batches and we rely on slowing down sending them.
So I want to add configuration flag controlling option try_send or send

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of STS, try_send is fine because even if we drop some transactions silently, there is a retry thread which sends them again anyways.

tpu-client-next/src/connection_workers_scheduler.rs Outdated Show resolved Hide resolved
@@ -35,7 +35,7 @@ pub trait LeaderUpdater: Send {
/// If the current leader estimation is incorrect and transactions are sent to
/// only one estimated leader, there is a risk of losing all the transactions,
/// depending on the forwarding policy.
fn next_leaders(&self, lookahead_slots: u64) -> Vec<SocketAddr>;
fn next_leaders(&mut self, lookahead_slots: u64) -> Vec<SocketAddr>;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because to implement this trait in STS I need to able to modify self.

@KirillLykov KirillLykov added the v2.1 Backport to v2.1 branch label Nov 5, 2024
Copy link

mergify bot commented Nov 5, 2024

Backports to the beta branch are to be avoided unless absolutely necessary for fixing bugs, security issues, and perf regressions. Changes intended for backport should be structured such that a minimum effective diff can be committed separately from any refactoring, plumbing, cleanup, etc that are not strictly necessary to achieve the goal. Any of the latter should go only into master and ride the normal stabilization schedule. Exceptions include CI/metrics changes, CLI improvements and documentation updates on a case by case basis.

@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch 2 times, most recently from 263d796 to c0fd36e Compare November 6, 2024 16:01
@KirillLykov
Copy link
Author

@alessandrod all addressed except for 0rtt (next time)

tpu-client-next/src/connection_workers_scheduler.rs Outdated Show resolved Hide resolved
Comment on lines 188 to 192
// Define the non-atomic struct and the `to_non_atomic` conversion method
define_non_atomic_struct!(
SendTransactionStatsNonAtomic,
SendTransactionStats,
{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

I would call the macro define_non_atoic_version_for!. Just to be extra clear, that it creates a struct based on another struct.

Suggested change
// Define the non-atomic struct and the `to_non_atomic` conversion method
define_non_atomic_struct!(
SendTransactionStatsNonAtomic,
SendTransactionStats,
{
// Define the non-atomic struct and the `to_non_atomic` conversion method
define_non_atomic_version_for!(
SendTransactionStats,
SendTransactionStatsNonAtomic,
{

Comment on lines 38 to 39
mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel,
mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style
Consider importing mpsc::error::TrySendError.

Suggested change
mpsc::error::TrySendError::Full(_) => WorkersCacheError::FullChannel,
mpsc::error::TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped,
use {
/* ... */
tokio::{sync::mpsc::{self, mpsc::error::TrySendError}, task::JoinHandle},
/* ... */
};
/* ... */
TrySendError::Full(_) => WorkersCacheError::FullChannel,
TrySendError::Closed(_) => WorkersCacheError::ReceiverDropped,

}

/// Sends a batch of transactions to the worker for a given peer. If the
/// worker for the peer is disconnected or fails, it is removed from the
/// cache.
pub async fn send_transactions_to_address(
pub(crate) fn try_send_transactions_to_address(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

Unrelated to this PR, but the address suffix in the name is probably unnecessary.
As the call will have the address as the first argument, it is probably readable enough if the function is called try_send_transactions() or try_send_transactions_to().

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I will not touch in this PR to minimize the number of changes (to simplify the backport 2.1)

Comment on lines 183 to 185
if let Some(worker) = worker {
tokio::spawn(async move {
let leader = worker.leader();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

You can reduce the whole body indentation with a let/else:

Suggested change
if let Some(worker) = worker {
tokio::spawn(async move {
let leader = worker.leader();
let Some(worker) = worker else {
return;
};
tokio::spawn(async move {
let leader = worker.leader();


pub(crate) fn maybe_shutdown_worker(worker: Option<ShutdownWorker>) {
if let Some(worker) = worker {
tokio::spawn(async move {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spawning tasks without observing their execution, I think, is a bit fragile.

The executor will keep track of them, but it is not clear in the rest of the code as to what is going on.
And if those tasks take a long time to complete or even hang, it would be just reflected in the executor taking a long time to finish or hanging.

I do not fully understand the end to end logic here.
So, it could be hard to track.
But I suggest you consider putting all the shutdown tasks in an UnorderedFutures or something like that, allowing you to track and even interrupt the shutdown process.

In particular, it would allow the shutdown errors to be processed in a centralized location, rather than them being just printed as warnings.
Though again, not sure if you really need this feature.
But, maybe for collection of the stats?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I suggest you consider putting all the shutdown tasks in an
UnorderedFutures or something like that, allowing you to track and even
interrupt the shutdown process.

we explicitly don't want to do this tho: we don't want cleanup to delay sending
transactions. So we do need tasks. We could have a JoinSet and call
poll_join_next to poll without blocking and pop tasks off the join set, although
that seems extra work for little gain. Background tasks don't block the runtime https://docs.rs/tokio/latest/tokio/runtime/struct.Runtime.html#shutdown

@@ -433,10 +436,10 @@ async fn test_staked_connection() {

// Wait for the exchange to finish.
tx_sender_shutdown.await;
let localhost_stats = join_scheduler(scheduler_handle).await;
let localhost_stats = join_scheduler(scheduler_handle).await.to_non_atomic();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put the .to_non_atomic() call inside the join_scheduler()?
Or we do not want it there for non-test case?

Copy link
Author

@KirillLykov KirillLykov Nov 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it allows to avoid cloning by doing this.

@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch 2 times, most recently from 1341c0c to 2dc802c Compare November 7, 2024 07:52
@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch 2 times, most recently from 3ee1853 to 448d476 Compare November 7, 2024 13:27
// Regardless of who is leader, add future leaders to the cache to
// hide the latency of opening the connection.
// add future leaders to the cache to hide the latency of opening the
// connection.
for peer in future_leaders {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here it established connection to future leaders

@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch from 448d476 to 958de6f Compare November 7, 2024 14:14
/// This enum defines to how many discovered leaders we will send transactions.
pub enum LeadersFanout {
/// Send transactions to all the leaders discovered by the `next_leaders`
/// call.
All,
/// Send transactions to the first selected number of leaders discovered by
/// the `next_leaders` call.
Next(usize),
Next(Fanout),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't like Next { send: usize, connect: usize}?

@KirillLykov KirillLykov force-pushed the klykov/add-fanout-to-client-next branch from 958de6f to 5fbfddd Compare November 7, 2024 15:40
@KirillLykov KirillLykov added the automerge automerge Merge this Pull Request automatically once CI passes label Nov 7, 2024
Copy link

@alessandrod alessandrod left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving with a nit. Feel free to fix the nit in one of the followups if you don't want to do another CI run

}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't think that this can ever happen? I'd remove the code

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if a connection to the leader is dropped and the worker is stopped.
This arm in the send_err match case below:

                    Err(WorkersCacheError::ReceiverDropped) => {
                        // Remove the worker from the cache, if the peer has disconnected.
                        maybe_shutdown_worker(workers.pop(*new_leader));
                    }

It is possible for the fanout_leaders to contain duplicates.
The duplicate would not be able to get a matching worker.

@mergify mergify bot merged commit 2a618b5 into anza-xyz:master Nov 7, 2024
52 checks passed
mergify bot pushed a commit that referenced this pull request Nov 7, 2024
* Add tpu-client-next to the root Cargo.toml

* Change LeaderUpdater trait to accept mut self

* add fanout to the tpu-client-next

* Shutdown in separate task

* Use try_send instead, minor impromenets

* fix LeaderUpdaterError traits

* improve lifetimes in split_leaders

Co-authored-by: Illia Bobyr <[email protected]>

* address PR comments

* create connections in advance

* removed lookahead_slots

---------

Co-authored-by: Illia Bobyr <[email protected]>
(cherry picked from commit 2a618b5)

# Conflicts:
#	Cargo.toml
Comment on lines +236 to +237
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

Suggested change
/// * the second vector contains the leaders, used to warm up connections. This
/// slice includes the the first set.
/// * the second slice contains the leaders, used to warm up connections. This
/// slice includes the first set.

let worker = Self::spawn_worker(
&endpoint,
peer,
worker_channel_size,
skip_check_transaction_age,
max_reconnect_attempts,
stats.clone(),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor

This clone() should be unnecessary - I do not see stats used in this block anymore.

Suggested change
stats.clone(),
stats,

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird that it passed clippy

Comment on lines +139 to +140
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit confusing that we call it send and connect portions elsewhere, but here they are called fanout and connection portions.

Maybe it would be more consistent to call it send_leaders and connect_leaders here as well?

Suggested change
let (fanout_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);
let (send_leaders, connect_leaders) =
split_leaders(&updated_leaders, &leaders_fanout);

Comment on lines +158 to +159
for new_leader in fanout_leaders {
if !workers.contains(new_leader) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the fanout to send rename above, the new_leader name is a bit confusing to me here.
I would expect the new_leader name to imply we are going to open a connection to this leader or start a worker for it.

But we actually start workers in the block above.

Maybe call it send_to instead?
Or some other name that indicates that this is only a destination for the next transaction batch.
It could as well be the same leader as in the previous slot group1.

Suggested change
for new_leader in fanout_leaders {
if !workers.contains(new_leader) {
for send_to in send_leaders {
if !workers.contains(send_to) {

Footnotes

  1. Is there a name for a sequence of NUM_CONSECUTIVE_LEADER_SLOTS slots?

}

for new_leader in fanout_leaders {
if !workers.contains(new_leader) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen if a connection to the leader is dropped and the worker is stopped.
This arm in the send_err match case below:

                    Err(WorkersCacheError::ReceiverDropped) => {
                        // Remove the worker from the cache, if the peer has disconnected.
                        maybe_shutdown_worker(workers.pop(*new_leader));
                    }

It is possible for the fanout_leaders to contain duplicates.
The duplicate would not be able to get a matching worker.

@KirillLykov
Copy link
Author

@ilya-bobyr yeah, in the follow up these renamings and also need to think how to have a backpressure for sending transactions. Not obvious to me so far, but this backpressure is not needed for SendTransactionService which is priority for now, only for transaction-bench.

alessandrod pushed a commit that referenced this pull request Nov 25, 2024
* add fanout to tpu-client-next (#3478)

* Add tpu-client-next to the root Cargo.toml

* Change LeaderUpdater trait to accept mut self

* add fanout to the tpu-client-next

* Shutdown in separate task

* Use try_send instead, minor impromenets

* fix LeaderUpdaterError traits

* improve lifetimes in split_leaders

Co-authored-by: Illia Bobyr <[email protected]>

* address PR comments

* create connections in advance

* removed lookahead_slots

---------

Co-authored-by: Illia Bobyr <[email protected]>
(cherry picked from commit 2a618b5)

# Conflicts:
#	Cargo.toml

* resolve the conflict

---------

Co-authored-by: kirill lykov <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
automerge automerge Merge this Pull Request automatically once CI passes v2.1 Backport to v2.1 branch
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants